package simpledb;

import java.util.*;

/**
 * The Join operator implements the relational join operation.
 */
public class Join extends Operator {

        private static final long serialVersionUID = 1L;

	private final JoinPredicate pred;
	private final DbIterator child1, child2;
	private Tuple t1 = null;
    /**
     * Constructor.  Accepts to children to join and the predicate
     * to join them on
     *
     * @param p The predicate to use to join the children
     * @param child1 Iterator for the left(outer) relation to join
     * @param child2 Iterator for the right(inner) relation to join
     */
    public Join(JoinPredicate p, DbIterator child1, DbIterator child2) {
        // some code goes here
    	this.pred = p;
    	this.child1 = child1;
    	this.child2 = child2;
    }

    /**
     * @see simpledb.TupleDesc#merge(TupleDesc, TupleDesc) for possible implementation logic.
     */
    public TupleDesc getTupleDesc() {
        // some code goes here
        return TupleDesc.merge(child1.getTupleDesc(), child2.getTupleDesc());
    }

    public void open()
        throws DbException, NoSuchElementException, TransactionAbortedException {
        // some code goes here
        super.open();
    	child1.open();
    	child2.open();
    }

    public void close() {
        // some code goes here
    	child1.close();
    	child2.close();
        super.close();
    }

    public void rewind() throws DbException, TransactionAbortedException {
        // some code goes here
        child1.rewind();
        child2.rewind();
    }

    /**
     * Returns the next tuple generated by the join, or null if there are no more tuples.
     * Logically, this is the next tuple in r1 cross r2 that satisfies the join
     * predicate.  There are many possible implementations; the simplest is a
     * nested loops join.
     * <p>
     * Note that the tuples returned from this particular implementation of
     * Join are simply the concatenation of joining tuples from the left and
     * right relation. Therefore, if an equality predicate is used 
     * there will be two copies of the join attribute
     * in the results.  (Removing such duplicate columns can be done with an
     * additional projection operator if needed.)
     * <p>
     * For example, if one tuple is {1,2,3} and the other tuple is {1,5,6},
     * joined on equality of the first column, then this returns {1,2,3,1,5,6}.
     *
     * @return The next matching tuple.
     * @see JoinPredicate#filter
     */
    protected Tuple fetchNext() throws TransactionAbortedException, DbException {
        // some code goes here
	while (t1 != null || child1.hasNext()) {
	    if (t1 == null) {
		assert child1.hasNext();
		t1 = child1.next();
	    }

	    while (child2.hasNext()) {
		Tuple t2 = child2.next();

		if (pred.filter(t1, t2))
		    return mergeTwoTuples(t1, t2);
	    }

	    t1 = null;
	    child2.rewind();
	}

	return null;
    }

    private Tuple mergeTwoTuples(Tuple t1, Tuple t2){
        Tuple newTuple = new Tuple(this.getTupleDesc());

        int counter = 0;
        for (int i = 0; i < child1.getTupleDesc().numFields(); i++) {
            newTuple.setField(counter, t1.getField(i));
            counter++;
        }
        for (int i = 0; i < child2.getTupleDesc().numFields(); i++) {
            newTuple.setField(counter, t2.getField(i));
            counter++;
        }

        return newTuple;
    }
}
